from neo4j.v1 import GraphDatabase
from cassandra.cluster import Cluster
import matplotlib.pyplot as plt
from bson import SON
from pymongo import MongoClient, GEOSPHERE, ASCENDING, DESCENDING
from datetime import datetime
import pandas as pd
import folium
import cufflinks as cf
from folium import plugins
from pandas.plotting import scatter_matrix
import matplotlib.pyplot as plt
from sklearn import model_selection
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC
from sklearn.cluster import KMeans
from sklearn import datasets
import numpy as np
cf.set_config_file(world_readable=True,offline=True)
import warnings
warnings.filterwarnings('ignore')
#-----------------------------------------------------------------MONGODB FUNCTIONS---------------------------------------------------------------
def get_session_mon(db_name):
"""
:param db_name: Nombre de la base de datos de los incidentes y los distritos
:return: Objeto de tipo DataBase con una conexión a la base de datos local
"""
client = MongoClient()
db = client[db_name]
return db
def create_indexes_mon(db):
"""
GEOSPHERE: para procesar coordinadas esféricas
:param db: Añade un Ãndice sobre los campos que contienen información geoespacial
"""
db.incidents.create_index([("Location", GEOSPHERE)])
db.neighbours.create_index([("the_geom", GEOSPHERE)])
db.incidents.create_indexes(["Date", ASCENDING])
def first_query_mon(db):
"""
:param db: referencia a la sesión de la bd
:return: Esta función obtiene los incidentes que están a una distancia máximo de 1000 metros
desde un punto representado por coordinadas geográficas en formato geojson
"""
# Montamos la query
query_incidents = {
"Location": {
"$near": {
"$geometry": SON([
("type", "Point"),
("coordinates", [-122.42158168136999, 37.7617007179518])
]),
"$maxDistance": 1000
}
}
}
# Ejecutamos la querry sobre la colección de incidencias
query_results = db.incidents.find(query_incidents)
df = pd.DataFrame(list(query_results))
return df
def second_query_mon(db):
"""
:param db: referencia a la sesión de la bd
:return: devuelve el distrito que contiene las coordinadas usadas en el
operado de intersección
"""
# Montamos la query
query_distrito = {"the_geom":
{"$geoIntersects":
{"$geometry": SON([
("type", "Point"),
("coordinates", [-122.42158168136999, 37.7617007179518])])
}
}
}
# Ejecutamos la querry sobre la colección de incidencias
query_results = db.neighbours.find_one(query_distrito)
return query_results
def third_query_mon(db):
"""
:param db: referencia a la sesión de la bd
:return: Devuelve el todos los incidentes, en dataframe, de un distrito, usando
operadores geo-espaciales, la consulta se realiza en fases sobre las dos
colecciones, primero encontramos el distrito, y luego buscamos todos los
incidentes que tienen las coordinadas dentro del polÃgono
"""
# Empezamos con el distrito
query_distrito = {"the_geom": {"$geoIntersects": {
"$geometry": SON([("type", "Point"), ("coordinates", [-122.42158168136999, 37.7617007179518])])}}}
distrito = db.neighbours.find_one(query_distrito)
# Ahora encontramos los incidentes
query_incidents = {"Location": {"$geoWithin": {"$geometry": distrito['the_geom']}}}
query_results = db.incidents.find(query_incidents)
df = pd.DataFrame(list(query_results))
return df
def since_february_mon(db):
"""
:param db: referencia a la sesión de la bd
:return: Devuelve los incidentes que han ocurrido desde febrero de 2018
"""
fecha = datetime(2018, 2, 1)
incidents = db.incidents.find({"Date": {"$gt": fecha}})
df = pd.DataFrame(list(incidents))
return df
def date_querry_mon(op, sdate, edate):
"""
:param opr: operador B: between dates, L: less than, G: greater than
:return: devuelve el filtro de la consulta según lo que se recibe por parámetro
en op
"""
switcher = {
'B': {"Date": {"$gte": sdate, "$lte": edate}}, # Between
'GE': {"Date": {"$gte": sdate}}, # Greater than or equal
'LE': {"Date": {"$lte": sdate}}, # less than or equal
}
return switcher.get(op)
def generic_date_search_mon(db, op, sdate, *args, **kwargs):
"""
:param db: referencia a la sesión de bd
:param op: operador para seleccionar
:param sdate: primera fecha, mandaterio
:param args:
:param kwargs: Contiene argumento opcional de la seguna fecha
:return: incidentes que cumplen con el filtro sobre fechas
"""
edate = kwargs.get('edate', None)
if not edate is None:
query = date_querry_mon(op, sdate, edate)
else:
query = date_querry_mon(op, sdate, None)
query_results = db.incidents.find(query)
df = pd.DataFrame(list(query_results))
return df
def total_per_category(db):
"""
Esta función usa el framework de aggregate para obtener el total de cada categorÃa
:param db: referencia a la sesión de bd
:return: Dataframe composed of two columns, categories and count of each category
"""
#Agrupamos por categorÃa y los contamos, y luego ordenamos
pipeline = [
{"$group": {"_id": "$Category", "count": {"$sum": 1}}},
{"$sort": SON([("count", -1), ("_id", -1)])}
]
aggregate_results = db.incidents.aggregate(pipeline)
df = pd.DataFrame(list(aggregate_results))
return df
def total_per_hour(db):
"""
Esta función usa el framework de aggregate para obtener el total de incidentes de cada hora
:param db: referencia a la sesión de bd
:return: Dataframe composed of two columns, hours of the day and count of total incidents in that hour
"""
# Primero proyectamos cada documento en otro sacando solo el _id y la hora
pipeline = [
{"$project": { "h": { "$hour": "$Date" } }}, # nuevo campo h que ha sido proyecto desde la fecha
{"$group": {"_id": "$h", "count": {"$sum": 1}}},
{"$sort": SON([("_id", 1), ("count", 1)])}
]
aggregate_results = db.incidents.aggregate(pipeline)
df = pd.DataFrame(list(aggregate_results))
return df
def draw_map_mon(ds):
"""
Esta función recibe un conjunto de incidentes y los dibuja en un mapa usando el paquete folium,
el mapa se guarda en un fichero.
:param ds: dataset de incidentes en formato de DataFrame
"""
incid_map = folium.Map(location=[37.7617007179518, -122.42158168136999], zoom_start=11, tiles='Stamen Terrain')
marker_cluster = plugins.MarkerCluster().add_to(incid_map)
for name, row in ds.iterrows():
folium.Marker([row["Y"], row["X"]], popup=row["Descript"]).add_to(marker_cluster)
incid_map.save('incidents.html')
return incid_map
def draw_heatmap_mon(df):
"""
Esta función recibe un conjunto de incidentes y los dibuja en un mapa de calor que
se guarda en un fichero html
:param ds: dataset de incidentes en formato de DataFrame
"""
heat_map = folium.Map(location=[37.7617007179518, -122.42158168136999], zoom_start=11, tiles='Stamen Terrain')
heat_map.add_child(plugins.HeatMap([[row["Y"], row["X"]] for name, row in df.iterrows()]))
heat_map.save('heat_map_incidets.html')
return heat_map
#-----------------------------------------------------------------MONGODB FUNCTIONS---------------------------------------------------------------
#-----------------------------------------------------------------CASSANDRA FUNCTIONS-------------------------------------------------------------
# SOME VARIABLES NEEDED FOR CONNECT TO THE DATABASE AND RETRIEVE DATA
cluster_cas = Cluster(['127.0.0.1'])
session_cas = cluster_cas.connect()
session_cas.set_keyspace('incidencias')
#ver todas las categorias
def get_categories_cas():
rows_categories = session_cas.execute('SELECT category FROM categorias')
categories = []
for category in rows_categories:
categories.append(category.category)
return categories
"""
categories_cas = get_categories_cas()
for category in categories_cas:
print (category)
"""
#ver todas los distritos
def get_districts_cas():
rows_districts = session_cas.execute('SELECT pddistrict FROM distritos')
districts = []
for district in rows_districts:
districts.append(district.pddistrict)
return districts
"""
districts_cas = get_districts_cas()
for district in districts_cas:
print(district)
"""
#cantidad distritos
def get_num_districts_cas():
num_districts = session_cas.execute('SELECT count(*) FROM distritos')
return num_districts[0].count
"""
num_districts_cas = get_num_districts_cas()
print(num_districts_cas)
"""
#Listar incidencias según el distrito'
def get_incidents_by_district_cas(district):
rows_incidents = session_cas.execute('SELECT * FROM incidencias.incidenciasbyzona WHERE pddistrict=%s',[district])
return rows_incidents
"""
rows_incidents_cas = get_incidents_by_district_cas('NORTHERN')
for incident in rows_incidents_cas:
print (incident.pdid)
"""
#Número incidencias según el distrito'
def get_num_incidents_by_district_cas(district):
num_incidents = session_cas.execute('SELECT COUNT(*) FROM incidencias.incidenciasbyzona WHERE pddistrict=%s',[district])
return num_incidents[0].count
"""
num_incidents_cas = get_num_incidents_by_district_cas('NORTHERN')
print(num_incidents_cas)
"""
#Listar incidencias según el distrito y la categoria
def get_incidents_by_category_district_cas(category,district):
rows_incidents = session_cas.execute('SELECT * FROM incidencias.incidenciasbycategoriazona WHERE Category=%s AND pddistrict=%s',[category,district])
return rows_incidents
"""
rows_incidents_cas = get_incidents_by_category_district_cas('SUICIDE','BAYVIEW')
for incident in rows_incidents_cas:
print (incident.pdid)
"""
#Número de incidencias según el distrito y la categoria
def get_num_incidents_by_category_district_cas(category,district):
num_incidents = session_cas.execute('SELECT COUNT(*) FROM incidencias.incidenciasbycategoriazona WHERE Category=%s AND pddistrict=%s',["SUICIDE","BAYVIEW"])
return num_incidents[0].count
"""
num_incidents_cas = get_num_incidents_by_category_district_cas('SUICIDE','BAYVIEW')
print(num_incidents_cas)
"""
#Listar incidencias según el distrito, la categoria, fecha inicio y fin
def get_incidents_by_category_district_betweendate_cas(category,district,date_start,date_end):
rows_incidents = session_cas.execute('SELECT * FROM incidencias.incidenciasbycategoriazonafecha WHERE Category=%s AND pddistrict=%s AND date >= %s AND date <= %s',["SUICIDE","BAYVIEW","2017-06-01","2018-08-01"])
return rows_incidents
"""
rows_incidents_cas = get_incidents_by_category_district_betweendate_cas('SUICIDE','BAYVIEW','2017-06-01','2018-08-01')
for incident in rows_incidents_cas:
print (incident.pdid)
"""
#Número de incidencias según el distrito, la categoria, fecha inicio y fin
def get_num_incidents_by_category_district_betweendate_cas(category,district,date_start,date_end):
num_incidents = session_cas.execute('SELECT COUNT(*) FROM incidencias.incidenciasbycategoriazonafecha WHERE Category=%s AND pddistrict=%s AND date >= %s AND date <= %s',["SUICIDE","BAYVIEW","2017-06-01","2018-08-01"])
return num_incidents[0].count
"""
num_incidents_cas = get_num_incidents_by_category_district_betweendate_cas('SUICIDE','BAYVIEW','2017-06-01','2018-08-01')
print(num_incidents_cas)
"""
#-----------------------------------------------------------------CASSANDRA FUNCTIONS-------------------------------------------------------------
#-----------------------------------------------------------------NEO4J FUNCTIONS-----------------------------------------------------------------
# SOME VARIABLES NEEDED FOR CONNECT TO THE DATABASE AND RETRIEVE DATA
uri_neo = "bolt://localhost:7687" # DATABASE URI
driver_neo = GraphDatabase.driver(uri_neo, auth=("neo4j", "admin")) # DRIVER NEO4J
session_neo = driver_neo.session() # SESSION
tx_neo = session_neo.begin_transaction() # TRANSACTION
# WE DEFINE SOME METHODS
def get_districts_neo(): # DISTRICTS LIST
list_dis = []
for record in tx_neo.run("MATCH (dis:District) RETURN dis.district"):
aux = record["dis.district"]
# print(aux)
list_dis.append(aux)
return list_dis
def get_count_districts_neo(): # NUMBER OF DISTRICTS
record = tx_neo.run("MATCH (dis:District) RETURN count(dis)")
count = record.single()[0]
return count
def get_count_incidents_district_neo(district): # NUMBER OF INCIDENTS IN A DISTRICT
record = tx_neo.run("MATCH (dis:District {district: $district}) RETURN size(()-[:WHERE]->(dis))", district=district)
count = record.single()[0]
return count
def get_categories_neo(): # CATEGORIES LIST
list_cat = []
for record in tx_neo.run("MATCH (cat:Category) RETURN cat.category"):
aux = record["cat.category"]
# print(aux)
list_cat.append(aux)
return list_cat
def get_count_categories_neo(): # NUMBER OF CATEGORIES
record = tx_neo.run("MATCH (cat:Category) RETURN count(cat)")
count = record.single()[0]
return count
def get_count_incidents_category_neo(category): # NUMBER OF INCIDENTS OF A CATEGORY
record = tx_neo.run("MATCH (cat:Category {category: $category}) RETURN size(()-[:WHAT]->(cat))", category=category)
count = record.single()[0]
return count
def get_count_incidents_district_category_neo(district, category): # NUMBER OF INCIDENTS IN A DISTRICT OF A CATEGORY
record = tx_neo.run(
"MATCH (cat:Category {category: $category}), (dis:District {district: $district}) RETURN size((cat)<-["
":WHAT]-()-[:WHERE]->(dis))",
district=district, category=category)
count = record.single()[0]
return count
#-----------------------------------------------------------------NEO4J---------------------------------------------------------------------------
#----------MONGODB------------
sfdb_mon = get_session_mon("san_francisco_incidents") # Obtenemos una sesión con la bd usando su nombre
fq_mon = first_query_mon(sfdb_mon) # Consulta geoespacial con operador $near
sq_mon = second_query_mon(sfdb_mon) # Consulta geoespacial con el operador $geoIntersects
tq_mon = third_query_mon(sfdb_mon) # Consulta geoespacial, devuelve todos los incidentes que pertenecen a un distrito
# Fechas para filtro de fechas
fecha1_mon = datetime(2017, 12, 1)
fecha2_mon = datetime(2017, 12, 31)
# Buscaremos los incidentes entre dos fechas, B=Between (ver doc de la función)
date_results_mon = generic_date_search_mon(sfdb_mon, 'B', fecha1_mon, edate=fecha2_mon)
# Buscamos todos los incidentes de febrero
feb_mon = since_february_mon(sfdb_mon) # Consulta especÃfica a una fecha
# Generamos un mapa con los miles primeros incidentes
m_mon = draw_map_mon(feb_mon.iloc[:1000])
hm_mon = draw_heatmap_mon(date_results_mon.iloc[:1000])
m_mon
hm_mon
# Obtenemos el total de incidentes por categorÃa y lo mostramos en un mapa de tipo PIE
df_mon = total_per_category(sfdb_mon)
df_mon.head()
df_mon.iplot(kind="pie", labels="_id", values="count")
# Obtenemos el total de incidentes por cada hora y lo mostramos en un diagrama de barras
dfh_mon = total_per_hour(sfdb_mon)
dfh_mon = dfh_mon.rename(index=str, columns={"_id": "Hour", "count": "Total Incidents"})
dfh_mon.head()
dfh_mon.iplot(kind='bar', filename='cufflinks/bar-chart-row')
print("CASSANDRA --> Number of districts: ", get_num_districts_cas())
print("CASSANDRA --> Number of incidents in NORTHERN: ", get_num_incidents_by_district_cas('NORTHERN'))
print("CASSANDRA --> Number of cases of SUICIDE in BAYVIEW between 2017-06-01 and 2018-08-01: ", get_num_incidents_by_category_district_betweendate_cas('SUICIDE','BAYVIEW','2017-06-01','2018-08-01'))
#Plot
districts_cas = get_districts_cas()
list_inc_cas = []
for dis in districts_cas: # RETRIVE THE COUNT OF INCIDENTS FOR EACH DISTRICT
list_inc_cas.append(get_num_incidents_by_district_cas(dis))
fig_cas = plt.figure()
plot_cas = fig_cas.add_subplot(111)
xx_cas = range(1, len(list_inc_cas)+1)
# BAR CHART
plot_cas.bar(xx_cas, list_inc_cas, width=0.5)
plot_cas.set_xticks(xx_cas)
plot_cas.set_xticklabels(districts_cas)
plot_cas.set_title('Incidents by district')
plot_cas.set_xlabel('Districts')
plot_cas.set_ylabel('Number of incidents')
plot_cas.tick_params(axis='both', which='major', labelsize=6)
plt.show() # SHOW THE BAR CHART
fig_cas.savefig('incidents_by_district_cas.png') # SAVE THE CHART
districts_neo = get_districts_neo()
n_districts_neo = get_count_districts_neo()
print("NEO --> Number of districts: " + str(n_districts_neo))
print("NEO --> " + str(get_count_incidents_district_neo("TENDERLOIN")))
categories_neo = get_categories_neo()
n_categories_neo = get_count_categories_neo()
print("NEO --> Number of categories: " + str(n_categories_neo))
# print("NEO --> " + get_count_incidents_category_neo("DRUNKENNESS"))
# print("NEO --> " + get_count_incidents_district_category_neo("BAYVIEW", "DRUG/NARCOTIC"))
# LET'S PLOT A CHART!!
list_inc_neo = []
for dis in districts_neo: # RETRIVE THE COUNT OF INCIDENTS FOR EACH DISTRICT
list_inc_neo.append(get_count_incidents_district_neo(dis))
fig_neo = plt.figure()
plot_neo = fig_neo.add_subplot(111)
xx_neo = range(1, len(list_inc_neo)+1)
# BAR CHART
plot_neo.bar(xx_neo, list_inc_neo, width=0.5)
plot_neo.set_xticks(xx_neo)
plot_neo.set_xticklabels(districts_neo)
plot_neo.set_title('Incidents by district')
plot_neo.set_xlabel('Districts')
plot_neo.set_ylabel('Number of incidents')
plot_neo.tick_params(axis='both', which='major', labelsize=6)
plt.show() # SHOW THE BAR CHART
fig_neo.savefig('incidents_by_district_neo.png') # SAVE THE CHART
# CLOSE DATABASE SESSION
session_neo.close()
Preparamos los datos que vamos a usar. En este caso vamos a seleccionar de la base de datos de MongoDB los 10000 últimos incidentes. No vamos a seleccionar todos los campos, solo nos interesan los campos de distrito, categorÃa, resolución y dÃa de la semana. Haremos una limpieza sobre estos datos para prepararlos a ser procesados por los algoritmos de aprendizaje.
# Especificamos los campos que queremos para seleccionar desde MongoDB
target_fields = {
'Category':1,
'DayOfWeek':1,
'PdDistrict':1,
'Resolution':1
}
# Select lo hacemos usando el primero sort para ordenar según la fecha y luego limitamos sólo a 10000
incidents_ml = sfdb_mon.incidents.find({},target_fields).sort('Date', DESCENDING).limit(100000);
df_ml = pd.DataFrame(list(incidents_ml))
df_ml = df_ml.drop(['_id'], axis=1) # Quitamos el campo _id que siempre viene por defecto
df_ml.__len__() # Vemos el tamaño del df que debe ser 10000
df_ml.head()
Primero vamos a limpiar los datos, convertiendo los campos de tipo string a tipo Categorical de python, de esta forma obtenemos un tipo numérico que es más adecuado para el tratamiento de datos. Usaremos un dataframe para la clasificación y otro para el clustering. Vamos a clasificar según el campo de CategorÃa, por lo que no lo vamos a convertir
class_df = df_ml.copy() # Dataframe de clasificación
# Convertimos los demás campos a tipo CategorÃa de pandas
class_df.PdDistrict = pd.Categorical(class_df.PdDistrict)
class_df.DayOfWeek = pd.Categorical(class_df.DayOfWeek)
class_df.Resolution = pd.Categorical(class_df.Resolution)
# En vez de Strings vamos a guardar los códigos en los valores
class_df['PdDistrict'] = class_df.PdDistrict.cat.codes
class_df['DayOfWeek'] = class_df.DayOfWeek.cat.codes
class_df['Resolution'] = class_df.Resolution.cat.codes
class_df.head()
# Split-out validation dataset
array = class_df.values
X = array[:,1:4]
Y = array[:,0] # El campo categorÃa es el primero
validation_size = 0.20 # Usamos 20% para la validación y el resto para el entrenamiento
seed = 7
X_train, X_validation, Y_train, Y_validation = model_selection.train_test_split(X, Y, test_size=validation_size, random_state=seed)
scoring = 'accuracy'
# Spot Check Algorithms
models = []
models.append(('LR', LogisticRegression()))
models.append(('LDA', LinearDiscriminantAnalysis()))
models.append(('KNN', KNeighborsClassifier()))
models.append(('CART', DecisionTreeClassifier()))
models.append(('NB', GaussianNB()))
# evaluate each model in turn
results = []
names = []
for name, model in models:
kfold = model_selection.KFold(n_splits=10, random_state=seed)
cv_results = model_selection.cross_val_score(model, X_train, Y_train, cv=kfold, scoring=scoring)
results.append(cv_results)
names.append(name)
msg = "%s: %f (%f)" % (name, cv_results.mean(), cv_results.std())
print(msg)
# Comparamos los algoritmos
fig = plt.figure()
fig.suptitle('Algorithm Comparison')
ax = fig.add_subplot(111)
plt.boxplot(results)
ax.set_xticklabels(names)
plt.show()
# Viendo el resultado, el CART ha sido el mejor
CART = DecisionTreeClassifier()
CART.fit(X_train, Y_train)
predictions = CART.predict(X_validation)
print(accuracy_score(Y_validation, predictions))
print(confusion_matrix(Y_validation, predictions))
print(classification_report(Y_validation, predictions))
El clustering lo vamos a hacer sobre le campo de Distrito y vamos a intentar crear 10 clusters.
# Dataframe del cluster, copiado del dataframe original, y hay que hacer el mismo tratamiento
# y limpiza que se le ha hecho al de clasificación
df_cluster = df_ml.copy()
df_target = df_ml[['PdDistrict']]
df_cluster = df_cluster.drop(['PdDistrict'], axis=1)
df_cluster.head()
# Convertimos a tipo Categorical y luego cambiamos los valores a tipo numérico
df_cluster.Category = pd.Categorical(df_cluster.Category)
df_cluster.DayOfWeek = pd.Categorical(df_cluster.DayOfWeek)
df_cluster.Resolution = pd.Categorical(df_cluster.Resolution)
df_target.PdDistrict = pd.Categorical(df_target.PdDistrict)
df_cluster['Category'] = df_cluster.Category.cat.codes
df_cluster['DayOfWeek'] = df_cluster.DayOfWeek.cat.codes
df_cluster['Resolution'] = df_cluster.Resolution.cat.codes
df_target['PdDistrict'] = df_target.PdDistrict.cat.codes
# Creamos un array de colores, un color por cada distrito
colors = np.array(['red', 'green', 'blue','pink', 'yellow', 'purple', 'black', 'grey', 'brown', 'orange'])
# Pasamos a ajustar un modelo usando Kmeans con un k=10
model = KMeans(n_clusters=10)
model.fit(df_cluster)
model.labels_
# The fudge to reorder the cluster ids.
predictedY = np.choose(model.labels_, [0,1,2,3,4,5,6,7,8,9]).astype(np.int64)
# Ahora mostramos el resultado después de haber ajustado el modelo a los datos, vamosa ver cómo quedan
# los datos de categorÃa con respecto al dÃa de la semana antes y después del ajuste.
plt.subplot(1, 2, 1)
plt.scatter(df_cluster['DayOfWeek'], df_cluster['Category'], c=colors[df_target['PdDistrict']], s=40 )
plt.title('Before classification')
# Plot the classifications according to the model
plt.subplot(1, 2, 2)
plt.scatter(df_cluster['DayOfWeek'], df_cluster['Category'], c=colors[predictedY], s=40)
plt.title("Model's classification")